Перейти к основному содержимому

2.09. Интеграционный поток

Всем

Что такое интеграционные потоки?

Интеграционный поток — это последовательность шагов, через которые проходят данные при перемещении между системами. Поток отражает логику обработки: от приёма сообщения до его доставки, включая возможные трансформации, проверки, маршрутизацию и реакцию на ошибки.

Поток описывает маршрут, триггеры, преобразования и точки принятия решений — от момента возникновения события до завершения обработки во всех задействованных компонентах.

Ключевые характеристики интеграционного потока:

  • Направленность: поток может быть однонаправленным (отправка уведомления), двунаправленным (запрос-ответ) или многоточечным (публикация в шину событий с несколькими подписчиками).
  • Оркестрация vs хореография: в оркестрованном потоке существует центральный координатор, управляющий последовательностью вызовов (например, BPM-движок). В хореографии каждая система реагирует на события независимо, без централизованного контроля.
  • Идемпотентность и атомарность: качественно спроектированный поток должен учитывать возможность повторной отправки сообщений (идемпотентность) и обеспечивать согласованность данных при частичных сбоях (через Saga-паттерн или компенсирующие транзакции).
  • Наблюдаемость: поток должен быть инструментирован — логирование, трассировка (distributed tracing), метрики — для диагностики и аудита.

Интеграционные потоки часто визуализируются в виде диаграмм последовательностей (sequence diagrams) или BPMN-схем. В промышленных платформах (например, BPMSoft, ELMA365, Apache NiFi) такие потоки могут конфигурироваться декларативно, без написания кода, что упрощает сопровождение и версионирование.


Основные типы интеграционных потоков

1. Однонаправленный поток (Fire-and-Forget)

Самый простой сценарий:

Событие произошло → данные отправлены → отправитель не ждёт подтверждения.

Пример:

  • Пользователь нажал «Заказать» → система учёта публикует событие order.created в шину → аналитическая система получает его и логирует.
  • Отправка логов в централизованное хранилище (например, через Fluentd → Kafka → Elasticsearch).

Особенности:

  • Минимальная задержка, высокая пропускная способность.
  • Нет гарантии доставки — если получатель недоступен, сообщение теряется (если не настроено сохранение на брокере).
  • Часто используется в event-driven архитектурах.

2. Синхронный запрос-ответ (Request-Response)

Классический REST- или gRPC-обмен:

Система А делает вызов → Система Б обрабатывает → возвращает результат → Система А продолжает работу только после ответа.

Пример:

  • Веб-приложение запрашивает баланс у платёжного сервиса (GET /balance?userId=123) → ждёт ответ {"amount": 49900} → показывает пользователю.

Особенности:

  • Простота отладки и понимания.
  • Блокирующая природа: если Б отвечает 5 секунд — А «висит».
  • Требует строгого SLA по времени ответа.

3. Асинхронный поток с подтверждением (Reliable Async)

Гибрид: данные уходят асинхронно, но с гарантией и обратной связью.

Отправитель → брокер сообщений → получатель обрабатывает → публикует ack/nack → отправитель реагирует.

Пример:

  • Создание заказа → запись в очередь orders.new → сервис обработки резервирует товар → при успехе публикует order.confirmed, при ошибке — order.failed → триггер уведомлений реагирует на confirmed, а служба поддержки — на failed.

Особенности:

  • Гарантированная доставка (при durable-очередях).
  • Возможность масштабировать обработку (много воркеров на одну очередь).
  • Поддержка идемпотентности и retry без дублирования.

4. Поток с трансформацией и маршрутизацией (ETL / ESB-стиль)

Данные проходят через «интеграционный конвейер»:

Источник → извлечение → очистка → преобразование → маршрутизация → несколько получателей.

Пример:

  • CRM выгружает контакты в CSV → интеграционный адаптер парсит, нормализует телефоны, обогащает геоданными → отправляет:
    • в рассылку (Mailchimp) — email и имя,
    • в аналитику (ClickHouse) — полный профиль,
    • в ERP (1С) — только ИНН и реквизиты.

Особенности:

  • Часто реализуется через iPaaS (например, Apache Camel, n8n, Node-RED) или ETL-инструменты (Talend, Airbyte).
  • Централизованное управление логикой потока.
  • Точка отказа и узкое место, если конвейер монолитный.

5. Цепочка компенсируемых транзакций (Saga)

Для распределённых операций, где ACID невозможен:

Шаг 1 → Шаг 2 → … → Шаг N
Если шаг K падает → запускаются компенсирующие действия: отмена K-1, отмена K-2, …

Пример бронирования путешествия:

  1. Забронировать рейс (flight.reserve) → получаем bookingId
  2. Забронировать отель (hotel.reserve)
  3. Списать деньги (payment.charge)
    → Если шаг 3 падает:
     → payment.refund (если деньги уже списаны)
     → hotel.cancel
     → flight.cancel

Особенности:

  • Сохраняет согласованность без блокировок.
  • Требует реализации обратных операций для каждого шага.
  • Используется в микросервисных системах, где распределённые транзакции невозможны.

Обязательные компоненты любого интеграционного потока

КомпонентРоль в потокеПримеры реализации
ТриггерИсточник запуска потокаAPI-вызов, cron, событие в БД (INSERT), webhook, изменение файла в S3
Очередь / шинаБуферизация, декуплингRabbitMQ, Apache Kafka, AWS SQS, Redis Streams
ТрансформерПриведение форматаJSON → XML, добавление заголовков, маппинг полей (userIdcustomer_id)
РоутерУсловная маршрутизацияЕсли country == "RU" → в 1С; иначе → в SAP
ВалидаторПроверка корректностиJSON Schema, кастомные правила («сумма > 0», «email валиден»)
Логгер / трейсерНаблюдаемостьOpenTelemetry, Jaeger, логи в Loki, метрики в Prometheus
Retry-механизмУстойчивостьЭкспоненциальная задержка, circuit breaker (Hystrix, Resilience4j)
КомпенсаторОткат при ошибкеОтмена брони, возврат средств, удаление временных записей